import json
import logging
import os.path as p
import random
import socket
import subprocess
import threading
import time

import kafka.errors
import pytest
from kafka import BrokerConnection, KafkaAdminClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsRequest_v1, DescribeGroupsResponse_v1
from kafka.protocol.group import MemberAssignment

from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.network import PartitionManager
from helpers.test_tools import TSV

if is_arm():
    # skip due to no arm support for clickhouse/kerberos-kdc docker image
    pytestmark = pytest.mark.skip

cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
    "instance",
    main_configs=["configs/kafka.xml"],
    user_configs=["configs/users.xml"],
    with_kerberized_kafka=True,
    clickhouse_path_dir="clickhouse_path",
)


def producer_serializer(x):
    return x.encode() if isinstance(x, str) else x


def get_kafka_producer(port, serializer):
    errors = []
    for _ in range(15):
        try:
            producer = KafkaProducer(
                bootstrap_servers="localhost:{}".format(port),
                value_serializer=serializer,
            )
            logging.debug("Kafka Connection establised: localhost:{}".format(port))
            return producer
        except Exception as e:
            errors += [str(e)]
            time.sleep(1)

    raise Exception("Connection not establised, {}".format(errors))


def kafka_produce(kafka_cluster, topic, messages, timestamp=None):
    logging.debug(
        "kafka_produce server:{}:{} topic:{}".format(
            "localhost", kafka_cluster.kerberized_kafka_port, topic
        )
    )
    producer = get_kafka_producer(
        kafka_cluster.kerberized_kafka_port, producer_serializer
    )
    for message in messages:
        producer.send(topic=topic, value=message, timestamp_ms=timestamp)
        producer.flush()


# Fixtures


@pytest.fixture(scope="module")
def kafka_cluster():
    try:
        cluster.start()
        if instance.is_debug_build():
            # https://github.com/ClickHouse/ClickHouse/issues/27651
            pytest.skip(
                "librdkafka calls system function for kinit which does not pass harmful check in debug build"
            )
        yield cluster
    finally:
        cluster.shutdown()


@pytest.fixture(autouse=True)
def kafka_setup_teardown():
    instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
    yield  # run test


# Tests


def test_kafka_json_as_string(kafka_cluster):
    kafka_produce(
        kafka_cluster,
        "kafka_json_as_string",
        [
            '{"t": 123, "e": {"x": "woof"} }',
            "",
            '{"t": 124, "e": {"x": "test"} }',
            '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
        ],
    )

    instance.query(
        """
        CREATE TABLE test.kafka (field String)
            ENGINE = Kafka
            SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
                     kafka_topic_list = 'kafka_json_as_string',
                     kafka_commit_on_select = 1,
                     kafka_group_name = 'kafka_json_as_string',
                     kafka_format = 'JSONAsString',
                     kafka_flush_interval_ms=1000;
        """
    )

    time.sleep(3)

    result = instance.query("SELECT * FROM test.kafka;")
    expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
    assert TSV(result) == TSV(expected)
    assert instance.contains_in_log(
        "Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
    )


def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster):
    # Ticket should be expired after the wait time
    # On run of SELECT query new ticket should be requested and SELECT query should run fine.

    kafka_produce(
        kafka_cluster,
        "kafka_json_as_string_after_expiration",
        [
            '{"t": 123, "e": {"x": "woof"} }',
            "",
            '{"t": 124, "e": {"x": "test"} }',
            '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
        ],
    )

    instance.query(
        """
        CREATE TABLE test.kafka (field String)
            ENGINE = Kafka
            SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
                     kafka_topic_list = 'kafka_json_as_string_after_expiration',
                     kafka_commit_on_select = 1,
                     kafka_group_name = 'kafka_json_as_string_after_expiration',
                     kafka_format = 'JSONAsString',
                     kafka_flush_interval_ms=1000;
        """
    )

    time.sleep(45)  # wait for ticket expiration

    result = instance.query("SELECT * FROM test.kafka;")
    expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
    assert TSV(result) == TSV(expected)
    assert instance.contains_in_log(
        "Parsing of message (topic: kafka_json_as_string_after_expiration, partition: 0, offset: 1) return no rows"
    )


def test_kafka_json_as_string_no_kdc(kafka_cluster):
    # When the test is run alone (not preceded by any other kerberized kafka test),
    # we need a ticket to
    # assert instance.contains_in_log("Ticket expired")
    instance.query(
        """
        CREATE TABLE test.kafka_no_kdc_warm_up (field String)
            ENGINE = Kafka
            SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
                     kafka_topic_list = 'kafka_json_as_string_no_kdc_warm_up',
                     kafka_group_name = 'kafka_json_as_string_no_kdc_warm_up',
                     kafka_commit_on_select = 1,
                     kafka_format = 'JSONAsString',
                     kafka_flush_interval_ms=1000;
        """
    )

    instance.query("SELECT * FROM test.kafka_no_kdc_warm_up;")

    kafka_produce(
        kafka_cluster,
        "kafka_json_as_string_no_kdc",
        [
            '{"t": 123, "e": {"x": "woof"} }',
            "",
            '{"t": 124, "e": {"x": "test"} }',
            '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
        ],
    )

    # temporary prevent CH - KDC communications
    with PartitionManager() as pm:
        other_node = "kafka_kerberos"
        for node in kafka_cluster.instances.values():
            source = node.ip_address
            destination = kafka_cluster.get_instance_ip(other_node)
            logging.debug(f"partitioning source {source}, destination {destination}")
            pm.add_rule(
                {
                    "instance": node,
                    "source": source,
                    "destination": destination,
                    "action": "REJECT",
                    "protocol": "all",
                }
            )

        time.sleep(45)  # wait for ticket expiration

        instance.query(
            """
            CREATE TABLE test.kafka_no_kdc (field String)
                ENGINE = Kafka
                SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
                         kafka_topic_list = 'kafka_json_as_string_no_kdc',
                         kafka_group_name = 'kafka_json_as_string_no_kdc',
                         kafka_commit_on_select = 1,
                         kafka_format = 'JSONAsString',
                         kafka_flush_interval_ms=1000;
            """
        )

        result = instance.query("SELECT * FROM test.kafka_no_kdc;")
    expected = ""

    assert TSV(result) == TSV(expected)
    assert instance.contains_in_log("StorageKafka (test\.kafka_no_kdc): Nothing to commit")
    assert instance.contains_in_log("Ticket expired")
    assert instance.contains_in_log("KerberosInit failure:")


def test_kafka_config_from_sql_named_collection(kafka_cluster):
    kafka_produce(
        kafka_cluster,
        "kafka_json_as_string_named_collection",
        [
            '{"t": 123, "e": {"x": "woof"} }',
            "",
            '{"t": 124, "e": {"x": "test"} }',
            '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
        ],
    )

    instance.query(
        """
        DROP NAMED COLLECTION IF EXISTS kafka_config;
        CREATE NAMED COLLECTION kafka_config AS
            kafka.security_protocol = 'SASL_PLAINTEXT',
            kafka.sasl_mechanism = 'GSSAPI',
            kafka.sasl_kerberos_service_name = 'kafka',
            kafka.sasl_kerberos_keytab = '/tmp/keytab/clickhouse.keytab',
            kafka.sasl_kerberos_principal = 'anotherkafkauser/instance@TEST.CLICKHOUSE.TECH',
            kafka.debug = 'security',
            kafka.api_version_request = 'false',

            kafka_broker_list = 'kerberized_kafka1:19092',
            kafka_topic_list = 'kafka_json_as_string_named_collection',
            kafka_commit_on_select = 1,
            kafka_group_name = 'kafka_json_as_string_named_collection',
            kafka_format = 'JSONAsString',
            kafka_flush_interval_ms=1000;
        """
    )
    instance.query(
        """
        CREATE TABLE test.kafka (field String)
            ENGINE = Kafka(kafka_config);
        """
    )

    time.sleep(3)

    result = instance.query("SELECT * FROM test.kafka;")
    expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
    assert TSV(result) == TSV(expected)
    assert instance.contains_in_log(
        "Parsing of message (topic: kafka_json_as_string_named_collection, partition: 0, offset: 1) return no rows"
    )


if __name__ == "__main__":
    cluster.start()
    input("Cluster created, press any key to destroy...")
    cluster.shutdown()
